Spark SQL查询解析

1 文本到执行计划

两个阶段:

  • 词法分析:从SQL字符串到标识集合,使用词法分析器Lexer
  • 语法分析:从标识集合到抽象语法树,使用语法分析器Parser

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 源码
// org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)

val tokenStream = new CommonTokenStream(lexer)
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
parser.SQL_standard_keyword_behavior = conf.ansiEnabled

try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// if we fail, parse with LL mode
tokenStream.seek(0) // rewind input stream
parser.reset()

// Try Again.
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
case e: ParseException if e.command.isDefined =>
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 从单元测试中看执行结果
behavior of "parser"

it should "convert SELECT into a LogicalPlan" in {
val parsedPlan = sparkSession.sessionState.sqlParser.parsePlan("SELECT * FROM numbers WHERE nr > 1")

parsedPlan.toString() should include ("'Project [*]")
parsedPlan.toString() should include ("+- 'Filter ('nr > 1)")
parsedPlan.toString() should include (" +- 'UnresolvedRelation `numbers`")
}

it should "convert filter into an Expression" in {
val parsedExpression = sparkSession.sessionState.sqlParser.parseExpression("nr > 1")

parsedExpression shouldBe a [GreaterThan]
parsedExpression.asInstanceOf[GreaterThan].left shouldBe a [UnresolvedAttribute]
parsedExpression.asInstanceOf[GreaterThan].right shouldBe a [Literal]
}

it should "convert data type into Spark StructType" in {
val parsedType = sparkSession.sessionState.sqlParser.parseDataType("struct")

parsedType.toString shouldEqual "StructType(StructField(nr,IntegerType,true), StructField(letter,StringType,true))"
}


it should "convert data type into a schema" in {
val parsedSchema = sparkSession.sessionState.sqlParser.parseTableSchema("nr INTEGER, letter STRING")

parsedSchema.toString shouldEqual "StructType(StructField(nr,IntegerType,true), StructField(letter,StringType,true))"
}

参考资料